# Workflow Configuration

To create a workflow, simply create a new class and use the `hatchet.workflow` and `hatchet.step` decorators to define the structure of your workflow. For example, a simple 2-step workflow would look like:

```py
from hatchet_sdk import Hatchet

hatchet = Hatchet()

@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
    @hatchet.step()
    def step1(self, context):
        print("executed step1")
        pass

    @hatchet.step(parents=["step1"])
    def step2(self, context):
        print("executed step2")
        pass
```

You'll notice that the workflow defines a workflow trigger (in this case, `on_events`), and the workflow definition. The workflow definition is a series of steps, which can be defined using the `hatchet.step` decorator. Each step must be a method on the class, and must accept a `context` argument. The `context` argument is a `Context` object, which contains information about the workflow, such as the input data and the output data of previous steps.

To create multi-step workflows, you can use `parents` to define the steps which the current step depends on.

## Workflow Triggers

You can define the following automatic triggers for workflows:

- `on_events`: Trigger the workflow when a specific event is sent to the Hatchet API. See the documentation for [running workflows via events](./run-workflow-events) for more information.
- `on_crons`: Trigger the workflow on a cron schedule. See the documentation for [running workflows via cron schedules](./run-workflow-cron) for more information.

## Retrieving Workflow Input Data

You can get access to the workflow's input data, such as the event data or other specified input data, by using the `context.workflow_input()` method on the `context`. For example, given the following event:

```json
{
  "name": "test"
}
```

You can get access to the event data by doing the following:

```py
@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
    @hatchet.step()
    def step1(self, context : Context):
        print("executed step1", context.workflow_input())
        pass
```

## Step Outputs

Step outputs should be a `dict` and are optional. For example:

```py
@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
    @hatchet.step()
    def step1(self, context : Context):
        return {
            "step-1-output": "test"
        }
```

Future steps can access this output by calling `context.step_output("<step>")`. In this example, a future step could access this data via `context.step_output("step1")`.

## Sync vs Async Steps

You can define async steps by using the `async` keyword on the step method. For example:

```py
@hatchet.workflow(on_events=["user:create"])
class MyWorkflow:
    @hatchet.step()
    async def step1(self, context : Context):
        print("executed step1", context.workflow_input())
        await asyncio.sleep(1)
        pass
```

When steps are run with `async`, they will be executed in the main thread using the default asyncio event loop. If an asyncio event loop is not set when the worker is started with `worker.start`, one will be created. If you are using `async` steps, **make sure you are not blocking the event loop**. If you are running blocking code, decorating the method with only `def` will execute it in a separate thread. You can also call `asyncio.run` within your `def` method if you're in need of using `async/await` within a `def` method, but please ensure that you are not using a third-party dependency which expects a shared event loop.

As a quick rule of thumb:

- If you are using `async/await`, use `async def` and ensure you are not blocking the event loop.
- If you are running blocking code, use `def` and ensure you are not using third-party dependencies that expect a shared event loop.

## Timeouts

**The default timeout on Hatchet is 60 seconds per step run**.

You can declare a timeout for a step by passing `timeout` to the `hatchet.step` decorator. Timeouts are strings in the format of `1h`, `1m`, `1s`, etc. For example, to timeout a step after 5 minutes, you can do the following:

```py
@hatchet.step(timeout="5m")
def step1(self, context):
    print("executed step1")
    pass
```

## Termination, Sleeps and Threads

Hatchet spawns a new thread per step, which means that there is a risk of thread leakage if your code is busy outside of the python interpreter. For example, this can happen if you call `time.sleep` within a step. Consider using an `async` method with `await asyncio.sleep` instead.

You can also determine whether to exit the thread by calling `context.done()` within a step, which returns true if the step has been cancelled. For example:

```py
@hatchet.step(timeout="2s")
def step1(self, context):
    while True:
        # this step will gracefully exit after 2 seconds
        if context.done():
            break
        pass
```

If you need control over cancellation, you can also use `context.cancel()` to cancel the current step, though this is not recommended.
